{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "df866b15-7d19-4c9b-80dc-19dce0255440",
   "metadata": {},
   "outputs": [],
   "source": [
    "import re\n",
    "from pathlib import Path\n",
    "from collections import defaultdict\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import gc"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1c2da73d-fe88-41db-9421-89387f4abc09",
   "metadata": {},
   "source": [
    "# 2025"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1361c248-b282-46b9-b98b-fbc6629c2849",
   "metadata": {},
   "source": [
    "## 1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "24d78305-dd5b-4e24-8033-02688d8388a1",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Base folder\n",
    "base_path = Path(r\"E:\\불평등 연구\\데이터\\59_출퇴근불평등\\이동데이터\\2025\\생활이동_자치구_202501\")\n",
    "\n",
    "# Find all 24 hourly CSV files\n",
    "files = sorted(base_path.glob(\"생활이동_자치구_2025.01_??시.csv\"))\n",
    "\n",
    "# Read and combine\n",
    "dfs = []\n",
    "for file in files:\n",
    "    df = pd.read_csv(file, encoding=\"cp949\")  # encoding might be 'cp949' for Korean\n",
    "    dfs.append(df)\n",
    "\n",
    "combined_df = pd.concat(dfs, ignore_index=True)\n",
    "\n",
    "# Save combined CSV\n",
    "out_file = base_path / \"생활이동_자치구_202501_통합.csv\"\n",
    "combined_df.to_csv(out_file, index=False, encoding=\"utf-8-sig\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c6dae8fd-5586-408a-971c-9b530fe77981",
   "metadata": {},
   "source": [
    "## 2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "30ce07b7-6e64-47ac-a359-1e4d2c940573",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Base folder\n",
    "base_path = Path(r\"E:\\불평등 연구\\데이터\\59_출퇴근불평등\\이동데이터\\2025\\생활이동_자치구_202502\")\n",
    "\n",
    "# Find all 24 hourly CSV files\n",
    "files = sorted(base_path.glob(\"생활이동_자치구_2025.02_??시.csv\"))\n",
    "\n",
    "# Read and combine\n",
    "dfs = []\n",
    "for file in files:\n",
    "    df = pd.read_csv(file, encoding=\"cp949\")  # encoding might be 'cp949' for Korean\n",
    "    dfs.append(df)\n",
    "\n",
    "combined_df = pd.concat(dfs, ignore_index=True)\n",
    "\n",
    "# Save combined CSV\n",
    "out_file = base_path / \"생활이동_자치구_202502_통합.csv\"\n",
    "combined_df.to_csv(out_file, index=False, encoding=\"utf-8-sig\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "53e35582-96e7-455e-8c0b-930e5d4a9d58",
   "metadata": {},
   "source": [
    "## 3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "cc382599-d288-4b37-ae4d-e99a500d211a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Base folder\n",
    "base_path = Path(r\"E:\\불평등 연구\\데이터\\59_출퇴근불평등\\이동데이터\\2025\\생활이동_자치구_202503\")\n",
    "\n",
    "# Find all 24 hourly CSV files\n",
    "files = sorted(base_path.glob(\"생활이동_자치구_2025.03_??시.csv\"))\n",
    "\n",
    "# Read and combine\n",
    "dfs = []\n",
    "for file in files:\n",
    "    df = pd.read_csv(file, encoding=\"cp949\")  # encoding might be 'cp949' for Korean\n",
    "    dfs.append(df)\n",
    "\n",
    "combined_df = pd.concat(dfs, ignore_index=True)\n",
    "\n",
    "# Save combined CSV\n",
    "out_file = base_path / \"생활이동_자치구_202503_통합.csv\"\n",
    "combined_df.to_csv(out_file, index=False, encoding=\"utf-8-sig\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c1e76e71-5bb3-466b-955e-74c9072ca2e4",
   "metadata": {},
   "source": [
    "## 4"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "1a28d225-c190-40d3-a495-03d100518758",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Base folder\n",
    "base_path = Path(r\"E:\\불평등 연구\\데이터\\59_출퇴근불평등\\이동데이터\\2025\\생활이동_자치구_202504\")\n",
    "\n",
    "# Find all 24 hourly CSV files\n",
    "files = sorted(base_path.glob(\"생활이동_자치구_2025.04_??시.csv\"))\n",
    "\n",
    "# Read and combine\n",
    "dfs = []\n",
    "for file in files:\n",
    "    df = pd.read_csv(file, encoding=\"cp949\")  # encoding might be 'cp949' for Korean\n",
    "    dfs.append(df)\n",
    "\n",
    "combined_df = pd.concat(dfs, ignore_index=True)\n",
    "\n",
    "# Save combined CSV\n",
    "out_file = base_path / \"생활이동_자치구_202504_통합.csv\"\n",
    "combined_df.to_csv(out_file, index=False, encoding=\"utf-8-sig\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a3d70bc1-52b3-4d0d-b84a-c2ba8f5f1e81",
   "metadata": {},
   "source": [
    "## 5"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "3449b9cc-27b8-439a-a11a-5dfd5466ac9a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Base folder\n",
    "base_path = Path(r\"E:\\불평등 연구\\데이터\\59_출퇴근불평등\\이동데이터\\2025\\생활이동_자치구_202505\")\n",
    "\n",
    "# Find all 24 hourly CSV files\n",
    "files = sorted(base_path.glob(\"생활이동_자치구_2025.05_??시.csv\"))\n",
    "\n",
    "# Read and combine\n",
    "dfs = []\n",
    "for file in files:\n",
    "    df = pd.read_csv(file, encoding=\"cp949\")  # encoding might be 'cp949' for Korean\n",
    "    dfs.append(df)\n",
    "\n",
    "combined_df = pd.concat(dfs, ignore_index=True)\n",
    "\n",
    "# Save combined CSV\n",
    "out_file = base_path / \"생활이동_자치구_202505_통합.csv\"\n",
    "combined_df.to_csv(out_file, index=False, encoding=\"utf-8-sig\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a67f20a1-79fe-4678-8cd7-f7adc24d7f5c",
   "metadata": {},
   "source": [
    "## 6"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "468e0ec4-7ea9-4746-9b10-a65fd54e8583",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Base folder\n",
    "base_path = Path(r\"E:\\불평등 연구\\데이터\\59_출퇴근불평등\\이동데이터\\2025\\생활이동_자치구_202506\")\n",
    "\n",
    "# Find all 24 hourly CSV files\n",
    "files = sorted(base_path.glob(\"생활이동_자치구_2025.06_??시.csv\"))\n",
    "\n",
    "# Read and combine\n",
    "dfs = []\n",
    "for file in files:\n",
    "    df = pd.read_csv(file, encoding=\"cp949\")  # encoding might be 'cp949' for Korean\n",
    "    dfs.append(df)\n",
    "\n",
    "combined_df = pd.concat(dfs, ignore_index=True)\n",
    "\n",
    "# Save combined CSV\n",
    "out_file = base_path / \"생활이동_자치구_202506_통합.csv\"\n",
    "combined_df.to_csv(out_file, index=False, encoding=\"utf-8-sig\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "04861436-26d3-4203-b655-86cc2f149a13",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bd540181-b702-486f-985c-51a34a3e24a7",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "id": "f1f02a20-f72e-4c0d-9946-20bf102a26bc",
   "metadata": {},
   "source": [
    "## Merge 1-12"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "1e054922-7f59-4e99-8d8a-88b5fedb38ff",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Reading] 생활이동_자치구_202501_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202501_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202502_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202502_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202503_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202503_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202504_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202504_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202505_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202505_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202506_통합.csv (encoding=utf-8-sig)\n",
      "[Reading] 생활이동_자치구_202506_통합.csv (encoding=utf-8-sig)\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\한승우\\AppData\\Local\\Temp\\ipykernel_28688\\42027664.py:193: DeprecationWarning: DataFrameGroupBy.apply operated on the grouping columns. This behavior is deprecated, and in a future version of pandas the grouping columns will be excluded from the operation. Either pass `include_groups=False` to exclude the groupings or explicitly select the grouping columns after groupby to silence this warning.\n",
      "  .apply(lambda g: pd.Series({\"HW_inbound_가중평균\":\n",
      "C:\\Users\\한승우\\AppData\\Local\\Temp\\ipykernel_28688\\42027664.py:201: DeprecationWarning: DataFrameGroupBy.apply operated on the grouping columns. This behavior is deprecated, and in a future version of pandas the grouping columns will be excluded from the operation. Either pass `include_groups=False` to exclude the groupings or explicitly select the grouping columns after groupby to silence this warning.\n",
      "  .apply(lambda g: pd.Series({\"HW_within_가중평균\":\n",
      "C:\\Users\\한승우\\AppData\\Local\\Temp\\ipykernel_28688\\42027664.py:210: DeprecationWarning: DataFrameGroupBy.apply operated on the grouping columns. This behavior is deprecated, and in a future version of pandas the grouping columns will be excluded from the operation. Either pass `include_groups=False` to exclude the groupings or explicitly select the grouping columns after groupby to silence this warning.\n",
      "  .apply(lambda g: pd.Series({\"WH_inbound_가중평균\":\n",
      "C:\\Users\\한승우\\AppData\\Local\\Temp\\ipykernel_28688\\42027664.py:218: DeprecationWarning: DataFrameGroupBy.apply operated on the grouping columns. This behavior is deprecated, and in a future version of pandas the grouping columns will be excluded from the operation. Either pass `include_groups=False` to exclude the groupings or explicitly select the grouping columns after groupby to silence this warning.\n",
      "  .apply(lambda g: pd.Series({\"WH_within_가중평균\":\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "✅ 완료: 스트리밍 집계\n",
      "- OD 레벨:  E:\\불평등 연구\\데이터\\59_출퇴근불평등\\이동데이터\\2025\\OD_월별_HW_WH_TOTAL_2022_streamed.csv\n",
      "- 도착구-월 인바운드/위딘/격차: E:\\불평등 연구\\데이터\\59_출퇴근불평등\\이동데이터\\2025\\DEST_월별_인바운드_위딘_갭_2022_streamed.csv\n"
     ]
    }
   ],
   "source": [
    "# =============== 설정 ===============\n",
    "BASE = Path(r\"E:\\불평등 연구\\데이터\\59_출퇴근불평등\\이동데이터\\2025\")\n",
    "# 파일 패턴: ..._202001_통합.csv ~ ..._202012_통합.csv (대소문자 모두)\n",
    "FILES = sorted([*BASE.glob(\"생활이동_자치구_2025??_통합.csv\"),\n",
    "                *BASE.glob(\"생활이동_자치구_2025??_통합.CSV\")])\n",
    "\n",
    "CHUNK_SIZE = 1_000_000  # 메모리 상황에 따라 500_000 등으로 조정 가능\n",
    "\n",
    "if not FILES:\n",
    "    raise FileNotFoundError(\"폴더에 '생활이동_자치구_2025MM_통합.csv' 파일이 없습니다.\")\n",
    "\n",
    "# =============== 유틸 ===============\n",
    "def norm(s: str) -> str:\n",
    "    \"\"\"열 이름 정규화: 공백/괄호 제거.\"\"\"\n",
    "    s = re.sub(r\"\\s+\", \"\", s)\n",
    "    s = s.replace(\"(\", \"\").replace(\")\", \"\")\n",
    "    return s\n",
    "\n",
    "# 정규화된 원본 헤더명을 표준키로 매핑\n",
    "# (좌: 정규화 키, 우: 표준키)\n",
    "norm_to_std = {\n",
    "    \"대상연월\": \"ym\",\n",
    "    \"출발시군구코드\": \"o\",\n",
    "    \"출발시군구\": \"o\",\n",
    "    \"출발시군\": \"o\",\n",
    "    \"도착시군구코드\": \"d\",\n",
    "    \"도착시군구\": \"d\",\n",
    "    \"도착시군\": \"d\",\n",
    "    \"이동유형\": \"type\",\n",
    "    \"평균이동시간분\": \"avg_min\",\n",
    "    \"평균이동시간\": \"avg_min\",\n",
    "    \"평균이동시간분분\": \"avg_min\",   # 혹시나 특이 케이스 대비\n",
    "    \"평균이동시간분\": \"avg_min\",\n",
    "    \"이동인구합\": \"flow\",\n",
    "    \"이동인구\": \"flow\",\n",
    "}\n",
    "\n",
    "def detect_actual_cols(file_path, encodings=(\"cp949\", \"euc-kr\", \"utf-8-sig\")):\n",
    "    \"\"\"파일별 실제 열 이름 감지 + 인코딩 탐지 (헤더만 읽음).\"\"\"\n",
    "    encoding = None\n",
    "    for enc in encodings:\n",
    "        try:\n",
    "            hdr = pd.read_csv(file_path, nrows=0, encoding=enc)\n",
    "            encoding = enc\n",
    "            break\n",
    "        except UnicodeDecodeError:\n",
    "            continue\n",
    "    if encoding is None:\n",
    "        hdr = pd.read_csv(file_path, nrows=0)  # 최후의 수단\n",
    "\n",
    "    actual = {}\n",
    "    for c in hdr.columns:\n",
    "        key = norm(str(c))\n",
    "        if key in norm_to_std:\n",
    "            std = norm_to_std[key]\n",
    "            # 이미 매핑된 표준키가 없을 때만 기록 (중복 방지)\n",
    "            if std not in actual:\n",
    "                actual[std] = c\n",
    "\n",
    "    needed = [\"ym\", \"o\", \"d\", \"type\", \"avg_min\", \"flow\"]\n",
    "    missing = [k for k in needed if k not in actual]\n",
    "    if missing:\n",
    "        raise KeyError(f\"{file_path.name}: 필요한 열 누락 {missing}. 매핑: {actual}\")\n",
    "    return actual, encoding\n",
    "\n",
    "def wavg(series, weights):\n",
    "    \"\"\"가중평균 (가중치 합이 0이면 NaN 반환).\"\"\"\n",
    "    w = weights.sum()\n",
    "    return (series * weights).sum() / w if w > 0 else np.nan\n",
    "\n",
    "# =============== 전역 누적기 (소형) ===============\n",
    "# 키 = (ym:int, o:int, d:int, type:str)\n",
    "sum_flow = defaultdict(float)     # Σ flow\n",
    "sum_timeflow = defaultdict(float) # Σ (avg_min * flow)\n",
    "\n",
    "# =============== 메인: 파일 스트리밍 처리 ===============\n",
    "for fp in FILES:\n",
    "    actual, enc = detect_actual_cols(fp)\n",
    "    usecols = list(actual.values())\n",
    "    # dtype: 이동유형을 범주형으로 읽어 메모리 절감\n",
    "    chunk_iter = pd.read_csv(\n",
    "        fp,\n",
    "        encoding=enc,\n",
    "        usecols=usecols,\n",
    "        chunksize=CHUNK_SIZE,\n",
    "        dtype={actual[\"type\"]: \"category\"}\n",
    "    )\n",
    "    print(f\"[Reading] {fp.name} (encoding={enc})\")\n",
    "\n",
    "    for i, chunk in enumerate(chunk_iter, 1):\n",
    "        # 표준 컬럼명으로 리네임\n",
    "        chunk = chunk.rename(columns={v: k for k, v in actual.items()})\n",
    "\n",
    "        # HW/WH만 유지\n",
    "        chunk = chunk[chunk[\"type\"].isin([\"HW\", \"WH\"])]\n",
    "        if chunk.empty:\n",
    "            del chunk; gc.collect(); continue\n",
    "\n",
    "        # 이동인구(합) 클린: \"*\", 공백, 콤마 제거 → 숫자\n",
    "        f = (chunk[\"flow\"].astype(str)\n",
    "                        .str.replace(\",\", \"\", regex=False)\n",
    "                        .str.strip()\n",
    "                        .replace({\"*\": \"0\", \"\": \"0\"}))\n",
    "        chunk[\"flow\"] = pd.to_numeric(f, errors=\"coerce\").fillna(0).astype(\"float32\")\n",
    "\n",
    "        # 평균이동시간(분) 숫자화\n",
    "        chunk[\"avg_min\"] = pd.to_numeric(chunk[\"avg_min\"], errors=\"coerce\").astype(\"float32\")\n",
    "\n",
    "        # 코드/연월 숫자화 (nullable Int32)\n",
    "        for col in (\"o\", \"d\", \"ym\"):\n",
    "            chunk[col] = pd.to_numeric(chunk[col], errors=\"coerce\").astype(\"Int32\")\n",
    "\n",
    "        # 필수 키 결측 제거\n",
    "        chunk = chunk.dropna(subset=[\"ym\", \"o\", \"d\"])\n",
    "        if chunk.empty:\n",
    "            del chunk; gc.collect(); continue\n",
    "\n",
    "        # time * flow 컬럼 생성 (float64로 안정적 합산)\n",
    "        chunk[\"tf\"] = (chunk[\"avg_min\"].astype(\"float64\") * chunk[\"flow\"].astype(\"float64\"))\n",
    "\n",
    "        # 청크 내부 집계로 중간 메모리 축소\n",
    "        g = (chunk.groupby([\"ym\", \"o\", \"d\", \"type\"], observed=True, sort=False, as_index=False)\n",
    "                    .agg(flow_sum=(\"flow\", \"sum\"),\n",
    "                         tf_sum=(\"tf\", \"sum\")))\n",
    "\n",
    "        # 전역 누적 합 업데이트\n",
    "        for row in g.itertuples(index=False):\n",
    "            key = (int(row.ym), int(row.o), int(row.d), str(row.type))\n",
    "            sum_flow[key]     += float(row.flow_sum)\n",
    "            sum_timeflow[key] += float(row.tf_sum)\n",
    "\n",
    "        # 청크/임시 객체 정리\n",
    "        del chunk, g, f\n",
    "        if i % 5 == 0:\n",
    "            gc.collect()\n",
    "\n",
    "    gc.collect()\n",
    "\n",
    "# =============== 누적 합 → 소형 집계테이블 ===============\n",
    "records = []\n",
    "for key in sum_flow.keys():\n",
    "    ym, o, d, t = key\n",
    "    N = sum_flow[key]\n",
    "    TF = sum_timeflow.get(key, 0.0)\n",
    "    mean_min = TF / N if N > 0 else np.nan\n",
    "    records.append((ym, o, d, t, N, mean_min))\n",
    "\n",
    "agg = pd.DataFrame(\n",
    "    records,\n",
    "    columns=[\"대상연월\", \"출발시군구코드\", \"도착시군구코드\", \"이동유형\", \"총이동인구\", \"가중평균시간\"]\n",
    ")\n",
    "\n",
    "# =============== HW / WH 분리 ===============\n",
    "hw = (agg[agg[\"이동유형\"] == \"HW\"]\n",
    "      .drop(columns=[\"이동유형\"])\n",
    "      .rename(columns={\"총이동인구\": \"HW_총이동인구\", \"가중평균시간\": \"HW_가중평균시간\"}))\n",
    "\n",
    "wh = (agg[agg[\"이동유형\"] == \"WH\"]\n",
    "      .drop(columns=[\"이동유형\"])\n",
    "      .rename(columns={\"총이동인구\": \"WH_총이동인구\", \"가중평균시간\": \"WH_가중평균시간\"}))\n",
    "\n",
    "# =============== TOTAL(왕복) 생성: HW(o->d) + WH(d->o) 매칭 ===============\n",
    "# WH를 역방향 키로 맞추기: (ym, o=WH_도착, d=WH_출발)\n",
    "wh_rev = wh.rename(columns={\"출발시군구코드\": \"d_rev\", \"도착시군구코드\": \"o_rev\"})\n",
    "wh_rev[\"출발시군구코드\"] = wh_rev[\"o_rev\"]\n",
    "wh_rev[\"도착시군구코드\"] = wh_rev[\"d_rev\"]\n",
    "wh_rev = wh_rev.drop(columns=[\"o_rev\", \"d_rev\"])\n",
    "\n",
    "od_total = pd.merge(\n",
    "    hw, wh_rev,\n",
    "    on=[\"대상연월\", \"출발시군구코드\", \"도착시군구코드\"],\n",
    "    how=\"outer\"\n",
    ")\n",
    "\n",
    "# 합산(왕복 시간): 결측은 0으로 간주(매칭 실패 시)\n",
    "od_total[\"TOTAL_통근시간(분)\"] = od_total[\"HW_가중평균시간\"].fillna(0) + od_total[\"WH_가중평균시간\"].fillna(0)\n",
    "# 아침/저녁 플로우 매칭 품질 참고용\n",
    "od_total[\"TOTAL_플로우_보조지표(최소값)\"] = od_total[[\"HW_총이동인구\", \"WH_총이동인구\"]].min(axis=1)\n",
    "\n",
    "# (선택) 소규모 플로우 셀 제거 예시:\n",
    "# MIN_FLOW = 30\n",
    "# od_total = od_total[(od_total[\"HW_총이동인구\"].fillna(0) >= MIN_FLOW) &\n",
    "#                     (od_total[\"WH_총이동인구\"].fillna(0) >= MIN_FLOW)]\n",
    "\n",
    "# =============== 목적지(도착구)-월 기준 인바운드 vs 위딘 격차 ===============\n",
    "def safe_group_wavg(df, val_col, w_col):\n",
    "    return wavg(df[val_col], df[w_col])\n",
    "\n",
    "# HW 인바운드/위딘 (도착구 기준)\n",
    "hw_inbound = (\n",
    "    hw[hw[\"출발시군구코드\"] != hw[\"도착시군구코드\"]]\n",
    "      .groupby([\"대상연월\", \"도착시군구코드\"], as_index=False)\n",
    "      .apply(lambda g: pd.Series({\"HW_inbound_가중평균\":\n",
    "                                  safe_group_wavg(g, \"HW_가중평균시간\", \"HW_총이동인구\")}))\n",
    "      .rename(columns={\"도착시군구코드\": \"district\"})\n",
    ")\n",
    "\n",
    "hw_within = (\n",
    "    hw[hw[\"출발시군구코드\"] == hw[\"도착시군구코드\"]]\n",
    "      .groupby([\"대상연월\", \"도착시군구코드\"], as_index=False)\n",
    "      .apply(lambda g: pd.Series({\"HW_within_가중평균\":\n",
    "                                  safe_group_wavg(g, \"HW_가중평균시간\", \"HW_총이동인구\")}))\n",
    "      .rename(columns={\"도착시군구코드\": \"district\"})\n",
    ")\n",
    "\n",
    "# WH 인바운드/위딘 (작업구 기준: 출발=work)\n",
    "wh_inbound = (\n",
    "    wh[wh[\"출발시군구코드\"] != wh[\"도착시군구코드\"]]\n",
    "      .groupby([\"대상연월\", \"출발시군구코드\"], as_index=False)\n",
    "      .apply(lambda g: pd.Series({\"WH_inbound_가중평균\":\n",
    "                                  safe_group_wavg(g, \"WH_가중평균시간\", \"WH_총이동인구\")}))\n",
    "      .rename(columns={\"출발시군구코드\": \"district\"})\n",
    ")\n",
    "\n",
    "wh_within = (\n",
    "    wh[wh[\"출발시군구코드\"] == wh[\"도착시군구코드\"]]\n",
    "      .groupby([\"대상연월\", \"출발시군구코드\"], as_index=False)\n",
    "      .apply(lambda g: pd.Series({\"WH_within_가중평균\":\n",
    "                                  safe_group_wavg(g, \"WH_가중평균시간\", \"WH_총이동인구\")}))\n",
    "      .rename(columns={\"출발시군구코드\": \"district\"})\n",
    ")\n",
    "\n",
    "dst_month = (\n",
    "    pd.merge(hw_inbound, hw_within, on=[\"대상연월\", \"district\"], how=\"outer\")\n",
    "      .merge(wh_inbound, on=[\"대상연월\", \"district\"], how=\"outer\")\n",
    "      .merge(wh_within, on=[\"대상연월\", \"district\"], how=\"outer\")\n",
    ")\n",
    "\n",
    "# TOTAL(인바운드/위딘) = HW + WH (합산: 하루 부담 관점)\n",
    "dst_month[\"TOTAL_inbound_가중평균\"] = dst_month[\"HW_inbound_가중평균\"].fillna(0) + dst_month[\"WH_inbound_가중평균\"].fillna(0)\n",
    "dst_month[\"TOTAL_within_가중평균\"]  = dst_month[\"HW_within_가중평균\"].fillna(0)  + dst_month[\"WH_within_가중평균\"].fillna(0)\n",
    "\n",
    "# 격차(시간 불평등)\n",
    "dst_month[\"GAP_inbound_vs_within_HW\"]     = dst_month[\"HW_inbound_가중평균\"] - dst_month[\"HW_within_가중평균\"]\n",
    "dst_month[\"GAP_inbound_vs_within_WH\"]     = dst_month[\"WH_inbound_가중평균\"] - dst_month[\"WH_within_가중평균\"]\n",
    "dst_month[\"GAP_inbound_vs_within_TOTAL\"]  = dst_month[\"TOTAL_inbound_가중평균\"] - dst_month[\"TOTAL_within_가중평균\"]\n",
    "\n",
    "# =============== 저장 ===============\n",
    "OUT_OD  = BASE / \"OD_월별_HW_WH_TOTAL_2025_streamed.csv\"\n",
    "OUT_DST = BASE / \"DEST_월별_인바운드_위딘_갭_2025_streamed.csv\"\n",
    "\n",
    "od_total.to_csv(OUT_OD,  index=False, encoding=\"utf-8-sig\")\n",
    "dst_month.to_csv(OUT_DST, index=False, encoding=\"utf-8-sig\")\n",
    "\n",
    "print(\"✅ 완료: 스트리밍 집계\")\n",
    "print(f\"- OD 레벨:  {OUT_OD}\")\n",
    "print(f\"- 도착구-월 인바운드/위딘/격차: {OUT_DST}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "89aa5d97-8935-4525-b50d-44ea48d8ec35",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d61f9daf-9bcf-4724-9adb-a4f7e80dc1a2",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
